// Copyright 2016-2022, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package engine

import (
	"bytes"
	"time"

	codeasset "github.com/pulumi/pulumi/pkg/v3/asset"
	"github.com/pulumi/pulumi/pkg/v3/display"
	"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
	"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"
	"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
	"github.com/pulumi/pulumi/sdk/v3/go/common/diag/colors"
	"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
	"github.com/pulumi/pulumi/sdk/v3/go/common/resource/archive"
	"github.com/pulumi/pulumi/sdk/v3/go/common/resource/asset"
	"github.com/pulumi/pulumi/sdk/v3/go/common/resource/config"
	"github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin"
	"github.com/pulumi/pulumi/sdk/v3/go/common/tokens"
	"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
	"github.com/pulumi/pulumi/sdk/v3/go/common/util/deepcopy"
	"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
)

// Event represents an event generated by the engine during an operation. The underlying
// type for the `Payload` field will differ depending on the value of the `Type` field
type Event struct {
	Type    EventType
	payload any
}

// StampedEvent represents an event generated by the engine during an operation which has further been stamped
// with a timestamp and sequence number.
type StampedEvent struct {
	Event
	Timestamp int
	Sequence  int
}

type EventPayload interface {
	StdoutEventPayload | DiagEventPayload | PreludeEventPayload | SummaryEventPayload |
		ResourcePreEventPayload | ResourceOutputsEventPayload | ResourceOperationFailedPayload |
		PolicyViolationEventPayload | PolicyRemediationEventPayload | PolicyLoadEventPayload | StartDebuggingEventPayload |
		PolicyAnalyzeSummaryEventPayload | PolicyRemediateSummaryEventPayload | PolicyAnalyzeStackSummaryEventPayload |
		ProgressEventPayload | ErrorEventPayload
}

func NewCancelEvent() Event {
	return Event{Type: CancelEvent}
}

func NewEvent[T EventPayload](payload T) Event {
	// We want to use deepcopy.Copy here to ensure the state is not mutated after the event is created. We need to lock
	// any *resource.State objects that are part of the payload, as other events may be trying to mutate state in
	// parallel with this event.

	var typ EventType
	switch p := any(payload).(type) {
	case StdoutEventPayload:
		typ = StdoutColorEvent
	case DiagEventPayload:
		typ = DiagEvent
	case PreludeEventPayload:
		typ = PreludeEvent
	case SummaryEventPayload:
		typ = SummaryEvent
	case ResourcePreEventPayload:
		typ = ResourcePreEvent
		p.Metadata.LockState()
		defer p.Metadata.UnlockState()
	case ResourceOutputsEventPayload:
		typ = ResourceOutputsEvent
		p.Metadata.LockState()
		defer p.Metadata.UnlockState()
	case ResourceOperationFailedPayload:
		typ = ResourceOperationFailed
		p.Metadata.LockState()
		defer p.Metadata.UnlockState()
	case PolicyViolationEventPayload:
		typ = PolicyViolationEvent
	case PolicyRemediationEventPayload:
		typ = PolicyRemediationEvent
	case PolicyLoadEventPayload:
		typ = PolicyLoadEvent
	case PolicyAnalyzeSummaryEventPayload:
		typ = PolicyAnalyzeSummaryEvent
	case PolicyRemediateSummaryEventPayload:
		typ = PolicyRemediateSummaryEvent
	case PolicyAnalyzeStackSummaryEventPayload:
		typ = PolicyAnalyzeStackSummaryEvent
	case StartDebuggingEventPayload:
		typ = StartDebuggingEvent
	case ProgressEventPayload:
		typ = ProgressEvent
	case ErrorEventPayload:
		typ = ErrorEvent
	default:
		contract.Failf("unknown event type %v", typ)
	}

	return Event{
		Type:    typ,
		payload: deepcopy.Copy(payload),
	}
}

// EventType is the kind of event being emitted.
type EventType string

const (
	CancelEvent                    EventType = "cancel"
	StdoutColorEvent               EventType = "stdoutcolor"
	DiagEvent                      EventType = "diag"
	PreludeEvent                   EventType = "prelude"
	SummaryEvent                   EventType = "summary"
	ResourcePreEvent               EventType = "resource-pre"
	ResourceOutputsEvent           EventType = "resource-outputs"
	ResourceOperationFailed        EventType = "resource-operationfailed"
	PolicyViolationEvent           EventType = "policy-violation"
	PolicyRemediationEvent         EventType = "policy-remediation"
	PolicyLoadEvent                EventType = "policy-load"
	PolicyAnalyzeSummaryEvent      EventType = "policy-analyze-summary"
	PolicyRemediateSummaryEvent    EventType = "policy-remediate-summary"
	PolicyAnalyzeStackSummaryEvent EventType = "policy-analyze-stack-summary"
	StartDebuggingEvent            EventType = "debugging-start"
	ProgressEvent                  EventType = "progress"
	ErrorEvent                     EventType = "error"
)

// ProgressType is the type of download occurring.
type ProgressType string

const (
	// PluginDownload represents a download of a plugin.
	PluginDownload ProgressType = "plugin-download"
	// PluginInstall represents the installation of a plugin.
	PluginInstall ProgressType = "plugin-install"
)

func (e Event) Payload() any {
	return e.payload
}

// Returns true if this is a ResourcePreEvent or ResourceOutputsEvent with the internal flag set.
func (e Event) Internal() bool {
	switch payload := e.payload.(type) {
	case ResourcePreEventPayload:
		return payload.Internal
	case ResourceOutputsEventPayload:
		return payload.Internal
	case StartDebuggingEventPayload:
		return true
	default:
		return false
	}
}

// Returns true if and only if this is an ephemeral event that should not be
// persisted. Ephemeral events are intended for display and reporting purposes
// only (e.g. progress).
func (e Event) Ephemeral() bool {
	switch e.payload.(type) {
	case ProgressEventPayload:
		return true
	default:
		return false
	}
}

// DiagEventPayload is the payload for an event with type `diag`
type DiagEventPayload struct {
	URN       resource.URN
	Prefix    string
	Message   string
	Color     colors.Colorization
	Severity  diag.Severity
	StreamID  int32
	Ephemeral bool
}

// PolicyViolationEventPayload is the payload for an event with type `policy-violation`.
type PolicyViolationEventPayload struct {
	ResourceURN       resource.URN
	Message           string
	Color             colors.Colorization
	PolicyName        string
	PolicyPackName    string
	PolicyPackVersion string
	EnforcementLevel  apitype.EnforcementLevel
	Prefix            string
	Severity          apitype.PolicySeverity
}

// PolicyRemediationEventPayload is the payload for an event with type `policy-remediation`.
type PolicyRemediationEventPayload struct {
	ResourceURN       resource.URN
	Color             colors.Colorization
	PolicyName        string
	PolicyPackName    string
	PolicyPackVersion string
	Before            resource.PropertyMap
	After             resource.PropertyMap
}

// PolicyLoadEventPayload is the payload for an event with type `policy-load`.
type PolicyLoadEventPayload struct{}

// PolicyAnalyzeSummaryEventPayload is the payload for an event with type `policy-analyze-summary`.
type PolicyAnalyzeSummaryEventPayload struct {
	// The URN of the resource being analyzed.
	ResourceURN resource.URN
	// The name of the policy pack.
	PolicyPackName string
	// The version of the policy pack.
	PolicyPackVersion string
	// The names of resource policies that passed (i.e. did not produce any violations).
	Passed []string
	// The names of resource policies that failed (i.e. produced violations).
	Failed []string
}

// PolicyRemediateSummaryEventPayload is the payload for an event with type `policy-remediate-summary`.
type PolicyRemediateSummaryEventPayload struct {
	// The URN of the resource being remediated.
	ResourceURN resource.URN
	// The name of the policy pack.
	PolicyPackName string
	// The version of the policy pack.
	PolicyPackVersion string
	// The names of resource policies that passed (i.e. did not produce any violations).
	Passed []string
	// The names of resource policies that failed (i.e. produced violations).
	Failed []string
}

// PolicyAnalyzeStackSummaryEventPayload is the payload for an event with type `policy-analyze-stack-summary`.
type PolicyAnalyzeStackSummaryEventPayload struct {
	// The name of the policy pack.
	PolicyPackName string
	// The version of the policy pack.
	PolicyPackVersion string
	// The names of stack policies that passed (i.e. did not produce any violations).
	Passed []string
	// The names of stack policies that failed (i.e. produced violations).
	Failed []string
}

// StartDebuggingEventPayload is the payload for an event of type `debugging-start`
type StartDebuggingEventPayload struct {
	Config map[string]any // the debug configuration (language-specific, see Debug Adapter Protocol)
}

// ProgressEventPayload is the payload for an event with type `progress`. This
// payload reports on the progress of a potentially long-running process being
// managed by the engine (e.g. a plugin download, or a plugin installation).
type ProgressEventPayload struct {
	// The type of process (e.g. plugin download, plugin install).
	Type ProgressType
	// A unique identifier for the process.
	ID string
	// A message accompanying the process.
	Message string
	// The number of items completed so far (e.g. bytes received, items installed,
	// etc.)
	Completed int64
	// The total number of items that must be completed.
	Total int64
	// True if and only if the process has completed.
	Done bool
}

type StdoutEventPayload struct {
	Message string
	Color   colors.Colorization
}

type PreludeEventPayload struct {
	IsPreview bool              // true if this prelude is for a plan operation
	Config    map[string]string // the keys and values for config. For encrypted config, the values may be blinded
}

type SummaryEventPayload struct {
	IsPreview       bool                    // true if this summary is for a plan operation
	MaybeCorrupt    bool                    // true if one or more resources may be corrupt
	Duration        time.Duration           // the duration of the entire update operation (zero values for previews)
	ResourceChanges display.ResourceChanges // count of changed resources, useful for reporting
	PolicyPacks     map[string]string       // {policy-pack: version} for each policy pack applied
}

type ResourceOperationFailedPayload struct {
	Metadata StepEventMetadata
	Status   resource.Status
	Steps    int32
}

type ResourceOutputsEventPayload struct {
	Metadata StepEventMetadata
	Planning bool
	Debug    bool
	// Internal is set for events that should not be shown to a user but are expected to be used in other parts of the
	// Pulumi system.
	Internal bool
}

type ResourcePreEventPayload struct {
	Metadata StepEventMetadata
	Planning bool
	Debug    bool
	// Internal is set for events that should not be shown to a user but are expected to be used in other parts of the
	// Pulumi system.
	Internal bool
}

type ErrorEventPayload struct {
	Error string
}

// StepEventMetadata contains the metadata associated with a step the engine is performing.
type StepEventMetadata struct {
	Op           display.StepOp                 // the operation performed by this step.
	URN          resource.URN                   // the resource URN (for before and after).
	Type         tokens.Type                    // the type affected by this step.
	Old          *StepEventStateMetadata        // the state of the resource before performing this step.
	New          *StepEventStateMetadata        // the state of the resource after performing this step.
	Res          *StepEventStateMetadata        // the latest state for the resource that is known (worst case, old).
	Keys         []resource.PropertyKey         // the keys causing replacement (only for CreateStep and ReplaceStep).
	Diffs        []resource.PropertyKey         // the keys causing diffs
	DetailedDiff map[string]plugin.PropertyDiff // the rich, structured diff
	Logical      bool                           // true if this step represents a logical operation in the program.
	Provider     string                         // the provider that performed this step.
}

func (m *StepEventMetadata) LockState() {
	if m.Old != nil && m.Old.State != nil {
		m.Old.State.Lock.Lock()
	}
	// The new state might be the same as the old state, don't try to lock it twice if it is.
	if m.New != nil && m.New.State != nil && (m.Old == nil || m.New.State != m.Old.State) {
		m.New.State.Lock.Lock()
	}
}

func (m *StepEventMetadata) UnlockState() {
	if m.New != nil && m.New.State != nil && (m.Old == nil || m.New.State != m.Old.State) {
		m.New.State.Lock.Unlock()
	}
	if m.Old != nil && m.Old.State != nil {
		m.Old.State.Lock.Unlock()
	}
}

// StepEventStateMetadata contains detailed metadata about a resource's state pertaining to a given step.
type StepEventStateMetadata struct {
	// State contains the raw, complete state, for this resource.
	State *resource.State
	// the resource's type.
	Type tokens.Type
	// the resource's object urn, a human-friendly, unique name for the resource.
	URN resource.URN
	// true if the resource is custom, managed by a plugin.
	Custom bool
	// true if this resource is pending deletion due to a replacement.
	Delete bool
	// the resource's unique ID, assigned by the resource provider (or blank if none/uncreated).
	ID resource.ID
	// an optional parent URN that this resource belongs to.
	Parent resource.URN
	// true to "protect" this resource (protected resources cannot be deleted).
	Protect bool
	// true to force replacement of this resource on the next update.
	Taint bool
	// RetainOnDelete is true if the resource is not physically deleted when it is logically deleted.
	RetainOnDelete bool `json:"retainOnDelete"`
	// the resource's input properties (as specified by the program). Note: because this will cross
	// over rpc boundaries it will be slightly different than the Inputs found in resource_state.
	// Specifically, secrets will have been filtered out, and large values (like assets) will be
	// have a simple hash-based representation.  This allows clients to display this information
	// properly, without worrying about leaking sensitive data, and without having to transmit huge
	// amounts of data.
	Inputs resource.PropertyMap
	// the resource's complete output state (as returned by the resource provider).  See "Inputs"
	// for additional details about how data will be transformed before going into this map.
	Outputs resource.PropertyMap
	// the resource's provider reference
	Provider string
	// InitErrors is the set of errors encountered in the process of initializing resource (i.e.,
	// during create or update).
	InitErrors []string
	// HideDiffs is the set of property paths where diffs are not displayed.
	HideDiffs []resource.PropertyPath
}

func makeEventEmitter(events chan<- Event, update UpdateInfo) (eventEmitter, error) {
	target := update.Target
	var secrets []string
	if target != nil && target.Config.HasSecureValue() {
		for k, v := range target.Config {
			if !v.Secure() {
				continue
			}

			secureValues, err := v.SecureValues(target.Decrypter)
			if err != nil {
				return eventEmitter{}, DecryptError{
					Key: k,
					Err: err,
				}
			}
			secrets = append(secrets, secureValues...)
		}
	}

	logging.AddGlobalFilter(logging.CreateFilter(secrets, "[secret]"))

	buffer, done := make(chan Event), make(chan bool)
	go queueEvents(events, buffer, done)

	return eventEmitter{
		done: done,
		ch:   buffer,
	}, nil
}

func makeQueryEventEmitter(events chan<- Event) (eventEmitter, error) {
	buffer, done := make(chan Event), make(chan bool)

	go queueEvents(events, buffer, done)

	return eventEmitter{
		done: done,
		ch:   buffer,
	}, nil
}

type eventEmitter struct {
	done <-chan bool
	ch   chan<- Event
}

func queueEvents(events chan<- Event, buffer chan Event, done chan bool) {
	// Instead of sending to the source channel directly, buffer events to account for slow receivers.
	//
	// Buffering is done by a goroutine that concurrently receives from the senders and attempts to send events to the
	// receiver. Events that are received while waiting for the receiver to catch up are buffered in a slice.
	//
	// We do not use a buffered channel because it is empirically less likely that the goroutine reading from a
	// buffered channel will be scheduled when new data is placed in the channel.

	defer close(done)
	contract.Assertf(buffer != nil, "buffer channel must not be nil")

	var queue []Event
	for {
		e, ok := <-buffer
		if !ok {
			return
		}
		queue = append(queue, e)

		// While there are events in the queue, attempt to send them to the waiting receiver. If the receiver is
		// blocked and an event is received from the event senders, stick that event in the queue.
		for len(queue) > 0 {
			select {
			case e, ok := <-buffer:
				if !ok {
					// If the event source has been closed, flush the queue.
					for _, e := range queue {
						trySendEvent(events, e)
					}
					return
				}
				queue = append(queue, e)
			case events <- queue[0]:
				queue = queue[1:]
			}
		}
	}
}

func makeStepEventMetadata(op display.StepOp, step deploy.Step, debug bool, showSecrets bool) StepEventMetadata {
	contract.Assertf(op == step.Op() || step.Op() == deploy.OpRefresh,
		"step must be %v or %v, got %v", op, deploy.OpRefresh, step.Op())

	var keys, diffs []resource.PropertyKey
	if keyer, hasKeys := step.(interface{ Keys() []resource.PropertyKey }); hasKeys {
		keys = keyer.Keys()
	}
	if differ, hasDiffs := step.(interface{ Diffs() []resource.PropertyKey }); hasDiffs {
		diffs = differ.Diffs()
	}

	var detailedDiff map[string]plugin.PropertyDiff
	if detailedDiffer, hasDetailedDiff := step.(interface {
		DetailedDiff() map[string]plugin.PropertyDiff
	}); hasDetailedDiff {
		detailedDiff = detailedDiffer.DetailedDiff()
	}

	return StepEventMetadata{
		Op:           op,
		URN:          step.URN(),
		Type:         step.Type(),
		Keys:         keys,
		Diffs:        diffs,
		DetailedDiff: detailedDiff,
		Old:          makeStepEventStateMetadata(step.Old(), debug, showSecrets),
		New:          makeStepEventStateMetadata(step.New(), debug, showSecrets),
		Res:          makeStepEventStateMetadata(step.Res(), debug, showSecrets),
		Logical:      step.Logical(),
		Provider:     step.Provider(),
	}
}

func makeStepEventStateMetadata(state *resource.State, debug bool, showSecrets bool) *StepEventStateMetadata {
	if state == nil {
		return nil
	}
	return &StepEventStateMetadata{
		State:          state,
		Type:           state.Type,
		URN:            state.URN,
		Custom:         state.Custom,
		Delete:         state.Delete,
		ID:             state.ID,
		Parent:         state.Parent,
		Protect:        state.Protect,
		Taint:          state.Taint,
		RetainOnDelete: state.RetainOnDelete,
		Inputs:         filterResourceProperties(state.Inputs, debug, showSecrets),
		Outputs:        filterResourceProperties(state.Outputs, debug, showSecrets),
		Provider:       state.Provider,
		InitErrors:     state.InitErrors,
		HideDiffs:      state.HideDiff,
	}
}

func (e *eventEmitter) Close() {
	tryCloseEventChan(e.ch)
	<-e.done
}

func (e *eventEmitter) sendEvent(event Event) {
	trySendEvent(e.ch, event)
}

func (e *eventEmitter) resourceOperationFailedEvent(
	step deploy.Step, status resource.Status, steps int32, debug, showSecrets bool,
) {
	contract.Requiref(e != nil, "e", "!= nil")

	e.sendEvent(NewEvent(ResourceOperationFailedPayload{
		Metadata: makeStepEventMetadata(step.Op(), step, debug, showSecrets),
		Status:   status,
		Steps:    steps,
	}))
}

func (e *eventEmitter) resourceOutputsEvent(
	op display.StepOp, step deploy.Step, planning, debug, internal, showSecrets bool,
) {
	contract.Requiref(e != nil, "e", "!= nil")

	e.sendEvent(NewEvent(ResourceOutputsEventPayload{
		Metadata: makeStepEventMetadata(op, step, debug, showSecrets),
		Planning: planning,
		Debug:    debug,
		Internal: internal,
	}))
}

func (e *eventEmitter) resourcePreEvent(
	step deploy.Step, planning, debug, internal, showSecrets bool,
) {
	contract.Requiref(e != nil, "e", "!= nil")

	e.sendEvent(NewEvent(ResourcePreEventPayload{
		Metadata: makeStepEventMetadata(step.Op(), step, debug, showSecrets),
		Planning: planning,
		Debug:    debug,
		Internal: internal,
	}))
}

func (e *eventEmitter) preludeEvent(isPreview bool, cfg config.Map) {
	contract.Requiref(e != nil, "e", "!= nil")

	configStringMap := make(map[string]string, len(cfg))
	for k, v := range cfg {
		keyString := k.String()
		valueString, err := v.Value(config.NewBlindingDecrypter())
		contract.AssertNoErrorf(err, "error getting configuration value for entry %q", keyString)
		configStringMap[keyString] = valueString
	}

	e.sendEvent(NewEvent(PreludeEventPayload{
		IsPreview: isPreview,
		Config:    configStringMap,
	}))
}

func (e *eventEmitter) summaryEvent(preview, maybeCorrupt bool, duration time.Duration,
	resourceChanges display.ResourceChanges, policyPacks map[string]string,
) {
	contract.Requiref(e != nil, "e", "!= nil")

	e.sendEvent(NewEvent(SummaryEventPayload{
		IsPreview:       preview,
		MaybeCorrupt:    maybeCorrupt,
		Duration:        duration,
		ResourceChanges: resourceChanges,
		PolicyPacks:     policyPacks,
	}))
}

func (e *eventEmitter) policyViolationEvent(urn resource.URN, d plugin.AnalyzeDiagnostic) {
	contract.Requiref(e != nil, "e", "!= nil")

	// Write prefix.
	var prefix bytes.Buffer
	//nolint:exhaustive // We only expect mandatory or advisory events here.
	switch d.EnforcementLevel {
	case apitype.Mandatory:
		prefix.WriteString(colors.SpecError)
	case apitype.Advisory:
		prefix.WriteString(colors.SpecWarning)
	default:
		contract.Failf("Unrecognized diagnostic severity: %v", d)
	}

	prefix.WriteString(string(d.EnforcementLevel))
	prefix.WriteString(": ")
	prefix.WriteString(colors.Reset)

	// Write the message itself.
	var buffer bytes.Buffer
	buffer.WriteString(colors.SpecNote)

	buffer.WriteString(d.Message)

	buffer.WriteString(colors.Reset)
	buffer.WriteRune('\n')

	e.sendEvent(NewEvent(PolicyViolationEventPayload{
		ResourceURN:       urn,
		Message:           logging.FilterString(buffer.String()),
		Color:             colors.Raw,
		PolicyName:        d.PolicyName,
		PolicyPackName:    d.PolicyPackName,
		PolicyPackVersion: d.PolicyPackVersion,
		EnforcementLevel:  d.EnforcementLevel,
		Prefix:            logging.FilterString(prefix.String()),
		Severity:          d.Severity,
	}))
}

func (e *eventEmitter) policyRemediationEvent(urn resource.URN, t plugin.Remediation,
	before resource.PropertyMap, after resource.PropertyMap,
) {
	contract.Requiref(e != nil, "e", "!= nil")

	e.sendEvent(NewEvent(PolicyRemediationEventPayload{
		ResourceURN:       urn,
		Color:             colors.Raw,
		PolicyName:        t.PolicyName,
		PolicyPackName:    t.PolicyPackName,
		PolicyPackVersion: t.PolicyPackVersion,
		Before:            before,
		After:             after,
	}))
}

func (e *eventEmitter) PolicyLoadEvent() {
	contract.Requiref(e != nil, "e", "!= nil")

	e.sendEvent(NewEvent(PolicyLoadEventPayload{}))
}

// Emit a new policy analyze summary event with the specified payload.
func (e *eventEmitter) policyAnalyzeSummaryEvent(s plugin.PolicySummary) {
	contract.Requiref(e != nil, "e", "!= nil")

	e.sendEvent(NewEvent(PolicyAnalyzeSummaryEventPayload{
		ResourceURN:       s.URN,
		PolicyPackName:    s.PolicyPackName,
		PolicyPackVersion: s.PolicyPackVersion,
		Passed:            s.Passed,
		Failed:            s.Failed,
	}))
}

// Emit a new policy remediate summary event with the specified payload.
func (e *eventEmitter) policyRemediateSummaryEvent(s plugin.PolicySummary) {
	contract.Requiref(e != nil, "e", "!= nil")

	e.sendEvent(NewEvent(PolicyRemediateSummaryEventPayload{
		ResourceURN:       s.URN,
		PolicyPackName:    s.PolicyPackName,
		PolicyPackVersion: s.PolicyPackVersion,
		Passed:            s.Passed,
		Failed:            s.Failed,
	}))
}

// Emit a new policy analyze stack summary event with the specified payload.
func (e *eventEmitter) policyAnalyzeStackSummaryEvent(s plugin.PolicySummary) {
	contract.Requiref(e != nil, "e", "!= nil")

	e.sendEvent(NewEvent(PolicyAnalyzeStackSummaryEventPayload{
		PolicyPackName:    s.PolicyPackName,
		PolicyPackVersion: s.PolicyPackVersion,
		Passed:            s.Passed,
		Failed:            s.Failed,
	}))
}

// Emit a new progress event with the specified payload.
func (e *eventEmitter) progressEvent(
	typ ProgressType,
	id string,
	message string,
	completed int64,
	total int64,
	done bool,
) {
	contract.Requiref(e != nil, "e", "!= nil")

	e.sendEvent(NewEvent(ProgressEventPayload{
		Type:      typ,
		ID:        id,
		Message:   message,
		Completed: completed,
		Total:     total,
		Done:      done,
	}))
}

func diagEvent(e *eventEmitter, d *diag.Diag, prefix, msg string, sev diag.Severity,
	ephemeral bool,
) {
	contract.Requiref(e != nil, "e", "!= nil")

	e.sendEvent(NewEvent(DiagEventPayload{
		URN:       d.URN,
		Prefix:    logging.FilterString(prefix),
		Message:   logging.FilterString(msg),
		Color:     colors.Raw,
		Severity:  sev,
		StreamID:  d.StreamID,
		Ephemeral: ephemeral,
	}))
}

func (e *eventEmitter) diagDebugEvent(d *diag.Diag, prefix, msg string, ephemeral bool) {
	diagEvent(e, d, prefix, msg, diag.Debug, ephemeral)
}

func (e *eventEmitter) diagInfoEvent(d *diag.Diag, prefix, msg string, ephemeral bool) {
	diagEvent(e, d, prefix, msg, diag.Info, ephemeral)
}

func (e *eventEmitter) diagInfoerrEvent(d *diag.Diag, prefix, msg string, ephemeral bool) {
	diagEvent(e, d, prefix, msg, diag.Infoerr, ephemeral)
}

func (e *eventEmitter) diagErrorEvent(d *diag.Diag, prefix, msg string, ephemeral bool) {
	diagEvent(e, d, prefix, msg, diag.Error, ephemeral)
}

func (e *eventEmitter) diagWarningEvent(d *diag.Diag, prefix, msg string, ephemeral bool) {
	diagEvent(e, d, prefix, msg, diag.Warning, ephemeral)
}

func (e *eventEmitter) startDebugging(info plugin.DebuggingInfo) {
	contract.Requiref(e != nil, "e", "!= nil")
	e.sendEvent(NewEvent(StartDebuggingEventPayload{
		Config: info.Config,
	}))
}

func filterResourceProperties(m resource.PropertyMap, debug bool, showSecrets bool) resource.PropertyMap {
	return filterPropertyValue(resource.NewProperty(m), debug, showSecrets).ObjectValue()
}

func filterPropertyValue(v resource.PropertyValue, debug bool, showSecrets bool) resource.PropertyValue {
	switch {
	case v.IsNull(), v.IsBool(), v.IsNumber():
		return v
	case v.IsString():
		// have to ensure we filter out secrets.
		return resource.NewProperty(logging.FilterString(v.StringValue()))
	case v.IsAsset():
		return resource.NewProperty(filterAsset(v.AssetValue(), debug))
	case v.IsArchive():
		return resource.NewProperty(filterArchive(v.ArchiveValue(), debug))
	case v.IsArray():
		arr := make([]resource.PropertyValue, len(v.ArrayValue()))
		for i, v := range v.ArrayValue() {
			arr[i] = filterPropertyValue(v, debug, showSecrets)
		}
		return resource.NewProperty(arr)
	case v.IsObject():
		obj := make(resource.PropertyMap, len(v.ObjectValue()))
		for k, v := range v.ObjectValue() {
			obj[k] = filterPropertyValue(v, debug, showSecrets)
		}
		return resource.NewProperty(obj)
	case v.IsComputed():
		return resource.MakeComputed(filterPropertyValue(v.Input().Element, debug, showSecrets))
	case v.IsOutput():
		return resource.MakeComputed(filterPropertyValue(v.OutputValue().Element, debug, showSecrets))
	case v.IsSecret() && showSecrets:
		return resource.NewProperty(v.SecretValue())
	case v.IsSecret():
		return resource.MakeSecret(resource.NewProperty("[secret]"))
	case v.IsResourceReference():
		ref := v.ResourceReferenceValue()
		return resource.NewProperty(resource.ResourceReference{
			URN:            resource.URN(logging.FilterString(string(ref.URN))),
			ID:             filterPropertyValue(ref.ID, debug, showSecrets),
			PackageVersion: logging.FilterString(ref.PackageVersion),
		})
	default:
		contract.Failf("unexpected property value type %T", v.V)
		return resource.PropertyValue{}
	}
}

func filterAsset(v *asset.Asset, debug bool) *asset.Asset {
	if !v.IsText() {
		return v
	}

	// we don't want to include the full text of an asset as we serialize it over as
	// events.  They represent user files and are thus are unbounded in size.  Instead,
	// we only include the text if it represents a user's serialized program code, as
	// that is something we want the receiver to see to display as part of
	// progress/diffs/etc.
	var text string
	if codeasset.IsUserProgramCode(v) {
		// also make sure we filter this in case there are any secrets in the code.
		text = logging.FilterString(codeasset.MassageIfUserProgramCodeAsset(v, debug).Text)
	} else {
		// We need to have some string here so that we preserve that this is a
		// text-asset
		text = "<contents elided>"
	}

	return &asset.Asset{
		Sig:  v.Sig,
		Hash: v.Hash,
		Text: text,
	}
}

func filterArchive(v *archive.Archive, debug bool) *archive.Archive {
	if !v.IsAssets() {
		return v
	}

	assets := make(map[string]any)
	for k, v := range v.Assets {
		switch v := v.(type) {
		case *asset.Asset:
			assets[k] = filterAsset(v, debug)
		case *archive.Archive:
			assets[k] = filterArchive(v, debug)
		default:
			contract.Failf("Unrecognized asset map type %T", v)
		}
	}
	return &archive.Archive{
		Sig:    v.Sig,
		Hash:   v.Hash,
		Assets: assets,
	}
}

// Sends an event like a normal send but recovers from a panic on a
// closed channel. This is generally a design smell and should be used
// very sparingly and every use of this function needs to document the
// need.
//
// eventEmitter uses tryEventSend to recover in the scenario of
// cancelSource.Terminate being called (such as user pressing Ctrl+C
// twice), when straggler stepExecutor workers are sending diag events
// but the engine is shutting down.
//
// See https://github.com/pulumi/pulumi/issues/10431 for the details.
func trySendEvent(ch chan<- Event, ev Event) (sent bool) {
	sent = true
	defer func() {
		if recover() != nil {
			sent = false
			if logging.V(9) {
				logging.V(9).Infof(
					"Ignoring %v send on a closed channel %p",
					ev.Type, ch)
			}
		}
	}()
	ch <- ev
	return sent
}

// Tries to close a channel but recovers from a panic of closing a
// closed channel. Restrictions on use are similarly to those of
// trySendEvent.
func tryCloseEventChan(ch chan<- Event) (closed bool) {
	closed = true
	defer func() {
		if recover() != nil {
			closed = false
			if logging.V(9) {
				logging.V(9).Infof(
					"Ignoring close of a closed event channel %p",
					ch)
			}
		}
	}()
	close(ch)
	return closed
}
